Solutions/Mimecast/Data Connectors/MimecastTTP/SharedCode/utils.py (733 lines of code) (raw):

"""Utils File.""" import inspect import requests import datetime import json from json.decoder import JSONDecodeError from SharedCode.state_manager import StateManager from SharedCode.mimecast_exception import MimecastException from SharedCode.logger import applogger from SharedCode import consts from tenacity import ( retry, stop_after_attempt, wait_exponential, retry_if_exception_type, retry_if_result, retry_any, RetryError, ) from requests.exceptions import ConnectionError def retry_on_status_code(response): """Check and retry based on a list of status codes. Args: response (): API response is passed Returns: Bool: if given status code is in list then true else false """ __method_name = inspect.currentframe().f_code.co_name if isinstance(response, dict): return False if response.status_code in consts.RETRY_STATUS_CODE: applogger.info( "{}(method={}) : Retrying due to status code : {}".format( consts.LOGS_STARTS_WITH, __method_name, response.status_code ) ) return True return False class Utils: """Utils Class.""" def __init__(self, azure_function_name) -> None: """Init Function.""" self.azure_function_name = azure_function_name self.log_format = consts.LOG_FORMAT self.headers = {} def check_environment_var_exist(self, environment_var): """Check the existence of required environment variables. Logs the validation process and completion. Raises MimecastException if any required field is missing. Args: environment_var(list) : variables to check for existence """ __method_name = inspect.currentframe().f_code.co_name try: applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Validating Environment Variables", ) ) missing_required_field = False for var in environment_var: key, val = next(iter(var.items())) if not val: missing_required_field = True applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Environment variable {} is not set".format(key), ) ) if missing_required_field: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Validation failed", ) ) raise MimecastException() if not consts.BASE_URL.startswith("https://"): applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, '"BaseURL" must start with ”https://”', ) ) raise MimecastException() applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Validation Complete", ) ) except MimecastException: raise MimecastException() except Exception as err: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.UNEXPECTED_ERROR_MSG.format(err), ) ) raise MimecastException() def get_checkpoint_data(self, checkpoint_obj: StateManager, load_flag=True): """Get checkpoint data from a StateManager object. Args: checkpoint_obj (StateManager): The StateManager object to retrieve checkpoint data from. load_flag (bool): A flag indicating whether to load the data as JSON (default is True). Returns: The retrieved checkpoint data. """ __method_name = inspect.currentframe().f_code.co_name try: applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Fetching checkpoint data", ) ) checkpoint_data = checkpoint_obj.get() if load_flag and checkpoint_data: checkpoint_data = json.loads(checkpoint_data) applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Checkpoint data = {}".format(checkpoint_data), ) ) return checkpoint_data except json.decoder.JSONDecodeError as json_error: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.JSON_DECODE_ERROR_MSG.format(json_error), ) ) raise MimecastException() except Exception as err: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.UNEXPECTED_ERROR_MSG.format(err), ) ) raise MimecastException() def post_checkpoint_data(self, checkpoint_obj: StateManager, data, dump_flag=True): """Post checkpoint data. It post the data to a checkpoint object based on the dump_flag parameter. Args: checkpoint_obj (StateManager): The StateManager object to post data to. data: The data to be posted. dump_flag (bool): A flag indicating whether to dump the data as JSON before posting (default is True). """ __method_name = inspect.currentframe().f_code.co_name try: applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Posting checkpoint data = {}".format(data), ) ) if dump_flag: checkpoint_obj.post(json.dumps(data)) else: checkpoint_obj.post(data) applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Data posted to azure storage", ) ) except TypeError as type_error: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.TYPE_ERROR_MSG.format(type_error), ) ) raise MimecastException() except Exception as err: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.UNEXPECTED_ERROR_MSG.format(err), ) ) raise MimecastException() @retry( stop=stop_after_attempt(consts.MAX_RETRIES), wait=wait_exponential( multiplier=consts.BACKOFF_MULTIPLIER, min=consts.MIN_SLEEP_TIME, max=consts.MAX_SLEEP_TIME, ), retry=retry_any( retry_if_result(retry_on_status_code), retry_if_exception_type(ConnectionError), ), before_sleep=lambda retry_state: applogger.error( "{}(method = {}) : Retring after {} secends, attempt number: {} ".format( consts.LOGS_STARTS_WITH, " Retry Decorator", retry_state.upcoming_sleep, retry_state.attempt_number, ) ), ) def make_rest_call( self, method, url, params=None, data=None, json=None, check_retry=True ): """Make a rest call. Args: url (str): The URL to make the call to. method (str): The HTTP method to use for the call. params (dict, optional): The parameters to pass in the call (default is None). data (dict, optional): The body(in x-www-form-urlencoded formate) of the request (default is None). json (dict, optional): The body(in row formate) of the request (default is None). check_retry (bool, optional): A flag indicating whether to check for retry (default is True). Returns: dict: The JSON response if the call is successful. """ __method_name = inspect.currentframe().f_code.co_name try: applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Rest Call, Method :{}, url: {}".format(method, url), ) ) response = requests.request( method, url, headers=self.headers, params=params, data=data, json=json, timeout=consts.MAX_TIMEOUT_SENTINEL, ) if response.status_code >= 200 and response.status_code <= 299: response_json = response.json() applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Success, Status code : {}".format(response.status_code), ) ) self.handle_failed_response_for_success(response_json) return response_json elif response.status_code == 400: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Bad Request = {}, Status code : {}".format( response.text, response.status_code ), ) ) self.handle_failed_response_for_failure(response) elif response.status_code == 401: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Unauthorized, Status code : {}".format(response.status_code), ) ) response_json = response.json() fail_json = response_json.get("fail", []) error_code = None error_message = None if fail_json: error_code = fail_json[0].get("code") error_message = fail_json[0].get("message") if check_retry: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Generating new token, Error message = {}, Error code = {}".format( error_message, error_code ), ) ) check_retry = False self.authenticate_mimecast_api(check_retry) return self.make_rest_call( method, url, params, data, json, check_retry ) else: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Max retry reached for generating access token," "Error message = {}, Error code = {}".format( error_message, error_code ), ) ) raise MimecastException() elif response.status_code == 403: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Forbidden, Status code : {}".format(response.status_code), ) ) self.handle_failed_response_for_failure(response) elif response.status_code == 404: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Not Found, URL : {}, Status code : {}".format( url, response.status_code ), ) ) raise MimecastException() elif response.status_code == 409: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Conflict, Status code : {}".format(response.status_code), ) ) self.handle_failed_response_for_failure(response) elif response.status_code == 429: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Too Many Requests, Status code : {} ".format( response.status_code ), ) ) return response elif response.status_code == 500: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Internal Server Error, Status code : {}".format( response.status_code ), ) ) return self.handle_failed_response_for_failure(response) applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Unexpected Error = {}, Status code : {}".format( response.text, response.status_code ), ) ) raise MimecastException() except MimecastException: raise MimecastException() except requests.exceptions.Timeout as error: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.TIME_OUT_ERROR_MSG.format(error), ) ) raise MimecastException() except JSONDecodeError as error: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.JSON_DECODE_ERROR_MSG.format( "{}, API Response = {}".format(error, response.text) ), ) ) raise MimecastException() except requests.ConnectionError as error: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.CONNECTION_ERROR_MSG.format(error), ) ) raise ConnectionError() except requests.RequestException as error: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.REQUEST_ERROR_MSG.format(error), ) ) raise MimecastException() except Exception as error: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.UNEXPECTED_ERROR_MSG.format(error), ) ) raise MimecastException() def handle_failed_response_for_failure(self, response): """Handle the failed response for failure status codes. If request get authentication error it will regenerate the access token. Args: response_json (dict): The JSON response from the API. """ __method_name = inspect.currentframe().f_code.co_name try: response_json = response.json() error_message = response_json fail_json = response_json.get("fail", []) error_json = response_json.get("error") if fail_json: error_message = fail_json[0].get("message") elif error_json: error_message = error_json.get("message") applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, error_message, ) ) if response.status_code in consts.EXCEPTION_STATUS_CODE: raise MimecastException() return response except MimecastException: raise MimecastException() except Exception as error: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.UNEXPECTED_ERROR_MSG.format(error), ) ) raise MimecastException() def handle_failed_response_for_success(self, response_json): """Handle the failed response for a successful request. Check if there is failure in success response or not. Args: response_json (dict): The JSON response from the request. """ __method_name = inspect.currentframe().f_code.co_name try: fail_json = response_json.get("fail", []) if fail_json: try: error_message = fail_json[0].get("errors")[0].get("message") except (KeyError, IndexError, ValueError, TypeError): error_message = fail_json applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Failed response message = {}".format(error_message), ) ) raise MimecastException() else: applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "No failed response found", ) ) return except MimecastException: raise MimecastException() except Exception as error: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.UNEXPECTED_ERROR_MSG.format(error), ) ) raise MimecastException() def authenticate_mimecast_api(self, check_retry=True): """Authenticate mimecast endpoint generate access token and update header. Args: check_retry (bool): Flag for retry of generating access token. """ __method_name = inspect.currentframe().f_code.co_name try: body = { "client_id": consts.MIMECAST_CLIENT_ID, "client_secret": consts.MIMECAST_CLIENT_SECRET, "grant_type": "client_credentials", } applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Generating access token", ) ) self.headers = {} url = "{}{}".format(consts.BASE_URL, consts.ENDPOINTS["OAUTH2"]) response = self.make_rest_call( method="POST", url=url, data=body, check_retry=check_retry ) if "access_token" in response: access_token = response.get("access_token") self.headers.update( { "Content-Type": "application/json", "Authorization": "Bearer {}".format(access_token), } ) applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Successfully generated access token and header updated", ) ) return applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Error occurred while fetching the access token from the response = {}".format( response ), ) ) raise MimecastException() except MimecastException: raise MimecastException() except RetryError as error: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.MAX_RETRY_ERROR_MSG.format( error, error.last_attempt.exception() ), ) ) raise MimecastException() except KeyError as key_error: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.KEY_ERROR_MSG.format(key_error), ) ) raise MimecastException() except Exception as error: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.UNEXPECTED_ERROR_MSG.format(error), ) ) raise MimecastException() def get_from_date_to_date_page_token(self, checkpoint_obj): """Get the from date, to date, and page token from the checkpoint data. If data is not available in checkpoint file, then get the start date from user input. If user input is not available or invalid then set from date's default value. Returns: Tuple[str, str, str]: A tuple containing the from date, to date, and page token. """ __method_name = inspect.currentframe().f_code.co_name try: checkpoint_data = self.get_checkpoint_data(checkpoint_obj) from_date = None page_token = "" to_date = None if not checkpoint_data: start_date = self.get_start_date_of_data_fetching() applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Checkpoint data is not available, Start fetching data from = {}".format( start_date ), ) ) from_date = start_date to_date = datetime.datetime.now(datetime.timezone.utc).strftime( consts.DATE_TIME_FORMAT ) else: from_date = checkpoint_data.get("from_date") page_token = checkpoint_data.get("page_token") to_date = checkpoint_data.get("to_date") if (not page_token and from_date) or (not to_date): to_date = datetime.datetime.now(datetime.timezone.utc).strftime( consts.DATE_TIME_FORMAT ) if not from_date: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "From date is not available in checkpoint, User has manually changed checkpoint", ) ) raise MimecastException() return from_date, to_date, page_token except MimecastException: raise MimecastException() except ValueError as err: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.VALUE_ERROR_MSG.format(err), ) ) raise MimecastException() except Exception as err: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.UNEXPECTED_ERROR_MSG.format(err), ) ) raise MimecastException() def get_start_date_of_data_fetching(self): """Retrieve the start date for data fetching. If no start date is provided, it calculates the start date based on a default lookup day. If the provided start date is invalid, it will fail and raise an exception. Returns: str: The start date for data fetching in the format specified by consts.DATE_TIME_FORMAT. """ __method_name = inspect.currentframe().f_code.co_name try: if not consts.START_DATE: start_date = ( datetime.datetime.utcnow() - datetime.timedelta(days=consts.DEFAULT_LOOKUP_DAY) ).strftime(consts.DATE_TIME_FORMAT) return start_date try: start_date = datetime.datetime.strptime( consts.START_DATE, "%Y-%m-%d" ).strftime(consts.DATE_TIME_FORMAT) applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Start date given by user is {}".format(start_date), ) ) # * if start date is future date, raise exception if start_date > datetime.datetime.utcnow().strftime( consts.DATE_TIME_FORMAT ): applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Start date given by user is future date", ) ) raise MimecastException() return start_date except ValueError: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Start date given by user is not valid", ) ) raise MimecastException() except MimecastException: raise MimecastException() except Exception as err: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.UNEXPECTED_ERROR_MSG.format(err), ) ) raise MimecastException() def iso_to_epoch_int(self, date_time): """Convert an ISO formatted date and time string to epoch time. Args: date_time (str): The input date and time string in the format "%Y-%m-%dT%H:%M:%SZ" Returns: int: The epoch time as a integer. """ __method_name = inspect.currentframe().f_code.co_name try: date_time_obj = datetime.datetime.strptime( date_time, consts.DATE_TIME_FORMAT ) epoch_time = date_time_obj.timestamp() return epoch_time except TypeError as error: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.TYPE_ERROR_MSG.format(error), ) ) raise MimecastException() except ValueError as error: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.VALUE_ERROR_MSG.format(error), ) ) raise MimecastException() except Exception as err: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.UNEXPECTED_ERROR_MSG.format(err), ) ) raise MimecastException()